Spark enables data scientists to tackle problems with larger data sizes than they could before with tools like R or Pandas
First of all check that PySpark is running properly. You can check if PySpark is correctly loaded: In case it is not, you can follow these posts:
In [ ]:
import pyspark
sc = pyspark.SparkContext(appName="my_spark_app")
The first thing to note is that with Spark all computation is parallelized by means of distributed data structures that are spread through the cluster. These collections are called Resilient Distributed Datasets (RDD). We will talk more about RDD, as they are the main piece in Spark.
As we have successfully loaded the Spark Context, we are ready to do some interactive analysis. We can read a simple file:
In [ ]:
lines = sc.textFile("../data/people.csv")
lines.count()
In [ ]:
lines.first()
This is a very simple first example, where we create an RDD (variable lines) and then we apply some operations (count and first) in a parallel manner. It has to be noted, that as we are running all our examples in a single computer the parallelization is not applied.
In the next section we will cover the core Spark concepts that allow Spark users to do parallel computation.
We will talk about Spark applications that are in charge of loading data and applying some distributed computation over it. Every application has a driver program that launches parallel operations to the cluster. In the case of interactive programming, the driver program is the shell (or Notebook) itself.
The "access point" to Spark from the driver program is the Spark Context object.
Once we have an Spark Context we can use it to build RDDs. In the previous examples we used sc.textFile() to represent the lines of the textFile. Then we run different operations over the RDD lines.
To run these operations over RDDs, driver programs manage different nodes called executors. For example, for the count operation, it is possible to run count in different ranges of the file.
Spark's API allows passing functions to its operators to run them on the cluster. For example, we could extend our example by filtering the lines in the file that contain a word, such as individuum.
In [ ]:
lines = sc.textFile("../data/people.csv")
filtered_lines = lines.filter(lambda line: "individuum" in line)
filtered_lines.first()
An RDD can be defined as a distributed collection of elements.
All work done with Spark can be summarized as creating, transforming and applying operations over RDDs to compute a result.
Under the hood, Spark automatically distributes the data contained in RDDs across your cluster and parallelizes the operations you perform on them.
RDD properties:
An RDD can be created in two ways:
We have already seen the two ways of creating an RDD.
In [ ]:
# loading an external dataset
lines = sc.textFile("../data/people.csv")
print(type(lines))
In [ ]:
# applying a transformation to an existing RDD
filtered_lines = lines.filter(lambda line: "individuum" in line)
print(type(filtered_lines))
It is important to note that once we have an RDD, we can run two kind of operations:
Notice how when we go to print it, it prints out that it is an RDD and that the type is a PipelinedRDD not a list of values as we might expect. That's because we haven't performed an action yet, we've only performed a transformation.
In [ ]:
# if we print lines we get only this
print(lines)
In [ ]:
# when we perform an action, then we get the result
action_result = lines.first()
print(type(action_result))
action_result
Transformations and actions are very different because of the way Spark computes RDDs.
Transformations are defined in a lazy manner this is they are only computed once they are used in an action.
In [ ]:
# filtered_lines is not computed until the next action is applied over it
# it make sense when working with big data sets, as it is not necessary to
# transform the whole RDD to get an action over a subset
# Spark doesn't even reads the complete file!
filtered_lines.first()
The drawback is that Spark recomputes again the RDD at each action application.
This means that the computing effort over an already computed RDD may be lost.
To mitigate this drawback, the user can take the decision of persisting the RDD after computing it the first time, Spark will store the RDD contents in memory (partitioned across the machines in your cluster), and reuse them in future actions.
Persisting RDDs on disk instead of memory is also possible.
Let's see an example on the impact of persisting:
In [ ]:
import time
lines = sc.textFile("../data/REFERENCE/*")
lines_nonempty = lines.filter( lambda x: len(x) > 0 )
words = lines_nonempty.flatMap(lambda x: x.split())
words_persisted = lines_nonempty.flatMap(lambda x: x.split())
t1 = time.time()
words.count()
print("Word count 1:",time.time() - t1)
t1 = time.time()
words.count()
print("Word count 2:",time.time() - t1)
t1 = time.time()
words_persisted.persist()
words_persisted.count()
print("Word count persisted 1:",time.time() - t1)
t1 = time.time()
words_persisted.count()
print("Word count persisted 2:", time.time() - t1)
We have already seen that RDDs have two basic operations: transformations and actions.
Transformations are operations that return a new RDD. Examples: filter, map.
Remember that , transformed RDDs are computed lazily, only when you use them in an action.
Lazy evaluation means that when we call a transformation on an RDD (for instance, calling map()), the operation is not immediately performed.
Instead, Spark internally records metadata to indicate that this operation has been requested.
Loading data into an RDD is lazily evaluated in the same way trans formations are. So, when we call sc.textFile(), the data is not loaded until it is necessary.
As with transformations, the operation (in this case, reading the data) can occur multiple times. Take in mind that transformations DO HAVE impact over computation time.
Many transformations are element-wise; that is, they work on one element at a time; but this is not true for all transformations.
In [ ]:
# load a file
lines = sc.textFile("../data/REFERENCE/*")
# make a transformation filtering positive length lines
lines_nonempty = lines.filter( lambda x: len(x) > 0 )
print("-> lines_nonepmty is: {} and if we print it we get\n {}".format(type(lines_nonempty), lines_nonempty))
# we transform again
words = lines_nonempty.flatMap(lambda x: x.split())
print("-> words is: {} and if we print it we get\n {}".format(type(words), words))
words_persisted = lines_nonempty.flatMap(lambda x: x.split())
print("-> words_persisted is: {} and if we print it we get\n {}".format(type(words_persisted), words_persisted))
final_result = words.take(10)
print("-> final_result is: {} and if we print it we get\n {}".format(type(final_result), final_result))
In [ ]:
import time
# we checkpint the initial time
t1 = time.time()
words.count()
# and count the time expmended on the computation
print("Word count 1:",time.time() - t1)
t1 = time.time()
words.count()
print("Word count 2:",time.time() - t1)
t1 = time.time()
words_persisted.persist()
words_persisted.count()
print("Word count persisted 1:",time.time() - t1)
t1 = time.time()
words_persisted.count()
print("Word count persisted 2:", time.time() - t1)
Actions are the operations that return a final value to the driver program or write data to an external storage system.
Actions force the evaluation of the transformations required for the RDD they were called on, since they need to actually produce output.
Returning to the previous example, until we call count over words and words persisted, the RDD are not computed. See that we persisted words_persisted, and until its second computation we cannot see the impact of persisting that RDD in memory.
If we want to see a part of the RDD, we can use take, and to have the full RDD we can use collect.
In [ ]:
lines = sc.textFile("../data/people.csv")
print("-> Three elements:\n", lines.take(3))
print("-> The whole RDD:\n", lines.collect())
Question: Why is not a god idea to collect an RDD?
Most of Spark’s transformations, and some of its actions, depend on passing in functions that are used by Spark to compute data.
In Python, we have three options for passing functions into Spark.
In [ ]:
lines = sc.textFile("../data/people.csv")
# we create a lambda function to apply tp all lines of the dataset
# WARNING, see that after splitting we get only the first element
first_cells = lines.map(lambda x: x.split(",")[0])
print(first_cells.collect())
# we can define a function as well
def get_cell(x):
return x.split(",")[0]
first_cells = lines.map(get_cell)
print(first_cells.collect())
The two most common transformations you will likely be using are map and filter.
The map() transformation takes in a function and applies it to each element in the RDD with the result of the function being the new value of each element in the resulting RDD.
The filter() transformation takes in a function and returns an RDD that only has elements that pass the filter() function.
Sometimes map() returns nested lists, to flatten these nested lists we can use flatMap().
So, flatMap() is called individually for each element in our input RDD. Instead of returning a single element, we return an iterator with our return values. Rather than producing an RDD of iterators, we get back an RDD that consists of the elements from all of the iterators.
distinct() transformation to produce a new RDD with only distinct items.
Note that distinct() is expensive, however, as it requires shuffling all the data over the network to ensure that we receive only one copy of each element
RDD.union(other) back an RDD consisting of the data from both sources.
Unlike the mathematical union(), if there are duplicates in the input RDDs, the result of Spark’s union() will contain duplicates (which we can fix if desired with distinct()).
RDD.intersection(other) returns only elements in both RDDs. intersection() also removes all duplicates (including duplicates from a single RDD) while running.
While intersection() and union() are two similar concepts, the performance of intersection() is much worse since it requires a shuffle over the network to identify common elements.
RDD.cartesian(other) transformation returns all possible pairs of (a,b) where a is in the source RDD and b is in the other RDD.
The Cartesian product can be useful when we wish to consider the similarity between all possible pairs, such as computing every user’s expected interest in each offer. We can also take the Cartesian product of an RDD with itself, which can be useful for tasks like user similarity. Be warned, however, that the Cartesian product is very expensive for large RDDs.
We have a file (../data/books.csv) with a lot of links to books. We want to perform an analysis to the books and its contents.
Exercise 1: Download all books, from books.csv using the map function.
Exercise 2: Identify transformations and actions. When the returned data is calculated?
Exercise 3: Imagine that you only want to download Dickens books, how would you do that? Which is the impact of not persisting dickens_books_content?
Exercise 4: Use flatMap() in the resulting RDD of the previous exercise, how the result is different?
Exercise 5: You want to know the different books authors there are.
Exercise 6: Return Poe's and Dickens' books URLs (use union function).
Exercise 7: Return the list of books without Dickens' and Poe's books.
Exercise 8: Count the number of books using reduce function.
For the following two exercices, we will use ../data/Sacramentorealestatetransactions.csv
Exercise 9: Compute the mean price of estates from csv containing Sacramento's estate price using aggregate function.
Exercise 10: Get top 5 highest and lowest prices in Sacramento estate's transactions
Exercise 1: Download all books, from books.csv using the map function.
Answer 1:
In [ ]:
import urllib3
def download_file(csv_line):
link = csv_line[0]
http = urllib3.PoolManager()
r = http.request('GET', link, preload_content=False)
response = r.read()
return response
In [ ]:
books_info = sc.textFile("../data/books.csv").map(lambda x: x.split(","))
print(books_info.take(10))
In [ ]:
books_content = books_info.map(download_file)
print(books_content.take(10)[1][:100])
Exercise 2: Identify transformations and actions. When the returned data is calculated?
Answer 2: If we consider the text reading as a transformation...
Transformations:
Actions:
Computation is carried out in actions. In this case we take advantage of it, as for downloading data we only apply the function to one element of the books_content RDD
Exercise 3: Imagine that you only want to download Dickens books, how would you do that? Which is the impact of not persisting dickens_books_content?
Answer 3:
In [ ]:
import re
def is_dickens(csv_line):
link = csv_line[0]
t = re.match("http://www.textfiles.com/etext/AUTHORS/DICKENS/",link)
return t != None
dickens_books_info = books_info.filter(is_dickens)
print(dickens_books_info.take(4))
dickens_books_content = dickens_books_info.map(download_file)
# take into consideration that each time an action is performed over dickens_book_content, the file is downloaded
# this has a big impact into calculations
print(dickens_books_content.take(2)[1][:100])
Exercise 4: Use flatMap() in the resulting RDD of the previous exercise, how the result is different?
Answer 4:
In [ ]:
flat_content = dickens_books_info.map(lambda x: x)
print(flat_content.take(4))
In [ ]:
flat_content = dickens_books_info.flatMap(lambda x: x)
print(flat_content.take(4))
Exercise 5: You want to know the different books authors there are.
Answer 5:
In [ ]:
def get_author(csv_line):
link = csv_line[0]
t = re.match("http://www.textfiles.com/etext/AUTHORS/(\w+)/",link)
if t:
return t.group(1)
return u'UNKNOWN'
authors = books_info.map(get_author)
authors.distinct().collect()
Exercise 6: Return Poe's and Dickens' books URLs (use union function).
Answer 6
In [ ]:
import re
def get_author_and_link(csv_line):
link = csv_line[0]
t = re.match("http://www.textfiles.com/etext/AUTHORS/(\w+)/",link)
if t:
return (t.group(1), link)
return (u'UNKNOWN',link)
authors_links = books_info.map(get_author_and_link)
# not very efficient
dickens_books = authors_links.filter(lambda x: x[0]=="DICKENS")
poes_books = authors_links.filter(lambda x: x[0]=="POE")
poes_dickens_books = poes_books.union(dickens_books)
# sample is a transformation that returns an RDD sampled over the original RDD
# https://spark.apache.org/docs/1.1.1/api/python/pyspark.rdd.RDD-class.html
poes_dickens_books.sample(True,0.05).collect()
In [ ]:
# takeSample is an action, returning a sampled subset of the RDD
poes_dickens_books.takeSample(True,10)
Exercise 7: Return the list of books without Dickens' and Poe's books.
Answer 7:
In [ ]:
authors_links.subtract(poes_dickens_books).map(lambda x: x[0]).distinct().collect()
Exercise 8: Count the number of books using reduce function.
Answer 8
In [ ]:
authors_links.map(lambda x: 1).reduce(lambda x,y: x+y) == authors_links.count()
In [ ]:
# let's see this approach more in detail
# this transformation generates an rdd of 1, one per element in the RDD
authors_map = authors_links.map(lambda x: 1)
authors_map.takeSample(True,10)
# with reduce, we pass a function with two parameters which is applied by pairs
# inside the the function we specify which operation we perform with the two parameters
# the result is then returned and the action is applied again using the result until there is only one element in the resulting
In [ ]:
# this is a very efficient way to do a summation in parallel
# using a functional approach
# we could define any operation inside the function
authors_map.reduce(lambda x,y: x*y)
Exercise 9: Compute the mean price of estates from csv containing Sacramento's estate price using aggregate function.
Answer 9
In [ ]:
sacramento_estate_csv = sc.textFile("../data/Sacramentorealestatetransactions.csv")
header = sacramento_estate_csv.first()
# first load the data
# we know that the price is in column 9
sacramento_estate = sacramento_estate_csv.filter(lambda x: x != header)\
.map(lambda x: x.split(","))\
.map(lambda x: int(x[9]))
sacramento_estate.takeSample(True, 10)
In [ ]:
seqOp = (lambda x,y: (x[0] + y, x[1] + 1))
combOp = (lambda x,y: (x[0] + y[0], x[1] + y[1]))
total_sum, number = sacramento_estate.aggregate((0,0),seqOp,combOp)
mean = float(total_sum)/number
mean
Exercise 10: Get top 5 highest and lowest prices in Sacramento estate's transactions
Answer 10
In [ ]:
print(sacramento_estate.top(5))
print(sacramento_estate.top(5, key=lambda x: -x))
Spark provides special operations on RDDs containing key/value pairs.
These RDDs are called pair RDDs, but are simple RDDs with an special structure. In Python, for the functions on keyed data to work we need to return an RDD composed of tuples.
Exercise 1: Create a pair RDD from our books information data, having author as key and the rest of the information as value. (Hint: the answer is very similar to the previous section Exercise 6)
Exercise 2: Check that pair RDDs are also RDDs and that common RDD operations work as well. Filter elements with author equals to "UNKNOWN" from previous RDD.
Exercise 3: Check mapValue in Spark API (http://spark.apache.org/docs/latest/api/python/pyspark.html#pyspark.RDD.mapValues) function that works on pair RDDs.
Exercise 1: Create a pair RDD from our books information data, having author as key and the rest of the information as value. (Hint: the answer is very similar to the previous section Exercise 6)
Answer 1:
In [ ]:
import re
def get_author_data(csv_line):
link = csv_line[0]
t = re.match("http://www.textfiles.com/etext/AUTHORS/(\w+)/",link)
if t:
return (t.group(1), csv_line)
return (u'UNKNOWN', csv_line)
books_info = sc.textFile("../data/books.csv").map(lambda x: x.split(","))
authors_info = books_info.map(get_author_data)
print(authors_info.take(5))
Exercise 2: Check that pair RDDs are also RDDs and that common RDD operations work as well. Filter elements with author equals to "UNKNOWN" from previous RDD.
Answer 2:
The operations over pair RDDs will also be slightly different.
But take into account that pair RDDs are just special RDDs that some operations can be applied, however common RDDs also fork for them.
In [ ]:
authors_info.filter(lambda x: x[0] != "UNKNOWN").take(3)
Exercise 3: Check mapValue in Spark API (http://spark.apache.org/docs/latest/api/python/pyspark.html#pyspark.RDD.mapValues) function that works on pair RDDs.
Answer 3:
Sometimes is awkward to work with pairs, and Spark provides a map function that operates over values.
In [ ]:
authors_info.mapValues(lambda x: x[2]).take(5)
Since pair RDDs contain tuples, we need to pass functions that operate on tuples rather than on individual elements.
Exercise 1: Get the total size of files for each author.
Exercise 2: Get the top 5 authors with more data.
Exercise 3: Try the combineByKey() with a randomly generated set of 5 values for 4 keys. Get the average value of the random variable for each key.
Exercise 4: Compute the average book size per author using combineByKey(). If you were an English Literature student and your teacher says: "Pick one Author and I'll randomly pick a book for you to read", what would be a Data Scientist answer?
Exercise 5: All Spark books have the word count example. Let's count words over all our books! (This might take some time)
Exercise 6: Group author data by author surname initial. How many authors have we grouped?
Exercise 7: Generate a pair RDD with alphabet letters in upper case as key, and empty list as value. Then group the previous RDD with this new one.
Exercise 1: Get the total size of files for each author.
Answer 1
In [ ]:
# first get each book size, keyed by author
authors_data = authors_info.mapValues(lambda x: int(x[2]))
authors_data.take(5)
In [ ]:
# ther reduce summing
authors_data.reduceByKey(lambda y,x: y+x).collect()
Exercise 2: Get the top 5 authors with more data.
Answer 2:
In [ ]:
authors_data.reduceByKey(lambda y,x: y+x).top(5,key=lambda x: x[1])
Exercise 3: Try the combineByKey() with a randomly generated set of 5 values for 4 keys. Get the average value of the random variable for each key.
Answer 3:
In [ ]:
import numpy as np
# generate the data
for pair in list(zip(np.arange(5).tolist()*5, np.random.normal(0,1,5*5))):
print(pair)
rdd = sc.parallelize(zip(np.arange(5).tolist()*5, np.random.normal(0,1,5*5)))
In [ ]:
createCombiner = lambda value: (value,1)
# you can check what createCombiner does
# rdd.mapValues(createCombiner).collect()
# here x is the combiner (sum,count) and value is value in the
# initial RDD (the random variable)
mergeValue = lambda x, value: (x[0] + value, x[1] + 1)
# here, all combiners are summed (sum,count)
mergeCombiner = lambda x, y: (x[0] + y[0], x[1] + y[1])
sumCount = rdd.combineByKey(createCombiner,
mergeValue,
mergeCombiner)
print(sumCount.collect())
sumCount.mapValues(lambda x: x[0]/x[1]).collect()
Exercise 4: Compute the average book size per author using combineByKey(). If you were an English Literature student and your teacher says: "Pick one Author and I'll randomly pick a book for you to read", what would be a Data Scientist answer?
Answer 4:
In [ ]:
createCombiner = lambda value: (value,1)
# you can check what createCombiner does
# rdd.mapValues(createCombiner).collect()
# here x is the combiner (sum,count) and value is value in the
# initial RDD (the random variable)
mergeValue = lambda x, value: (x[0] + value, x[1] + 1)
# here, all combiners are summed (sum,count)
mergeCombiner = lambda x, y: (x[0] + y[0], x[1] + y[1])
sumCount = authors_data.combineByKey(createCombiner,
mergeValue,
mergeCombiner)
print(sumCount.mapValues(lambda x: x[0]/x[1]).collect())
# I would choose the author with lowest average book size
print(sumCount.mapValues(lambda x: x[0]/x[1]).top(5,lambda x: -x[1]))
Exercise 5: All Spark books have the word count example. Let's count words over all our books! (This might take some time)
Answer 5:
In [ ]:
import urllib3
import re
def download_file(csv_line):
link = csv_line[0]
http = urllib3.PoolManager()
r = http.request('GET', link, preload_content=False)
response = r.read()
return str(response)
books_info = sc.textFile("../data/books.csv").map(lambda x: x.split(","))
#books_content = books_info.map(download_file)
# while trying the function use only two samples
books_content = sc.parallelize(books_info.map(download_file).take(2))
words_rdd = books_content.flatMap(lambda x: x.split(" ")).\
flatMap(lambda x: x.split("\r\n")).\
map(lambda x: re.sub('[^0-9a-zA-Z]+', '', x).lower()).\
filter(lambda x: x != '')
words_rdd.map(lambda x: (x,1)).reduceByKey(lambda x,y: x+y).top(5, key=lambda x: x[1])
Exercise 6: Group author data by author surname initial. How many authors have we grouped?
Answer 6:
In [ ]:
print(authors_info.groupBy(lambda x: x[0][0]).collect())
authors_info.map(lambda x: x[0]).distinct().\
map(lambda x: (x[0],1)).\
reduceByKey(lambda x,y: x+y).\
filter(lambda x: x[1]>1).\
collect()
Exercise 7: Generate a pair RDD with alphabet letters in upper case as key, and empty list as value. Then group the previous RDD with this new one.
Answer 7:
In [ ]:
import string
sc.parallelize(list(string.ascii_uppercase)).\
map(lambda x: (x,[])).\
cogroup(authors_info.groupBy(lambda x: x[0][0])).\
take(5)
Some of the most useful operations we get with keyed data comes from using it together with other keyed data.
Joining data together is probably one of the most common operations on a pair RDD, and we have a full range of options including right and left outer joins, cross joins, and inner joins.
Only keys that are present in both pair RDDs are output.
When there are multiple values for the same key in one of the inputs, the resulting pair RDD will have an entry for every possible pair of values with that key from the two input RDDs
Exercise:
Take countries_data_clean.csv and countries_GDP_clean.csv and join them using country name as key.
Before doing the join, please, check how many element should the resulting pair RDD have.
After the join, check if the initial hypothesis was true.
In case it is not, what is the reason?
How would you resolve that problem?
In [ ]:
#more info: https://www.worlddata.info/downloads/
rdd_countries = sc.textFile("../data/countries_data_clean.csv").map(lambda x: x.split(","))
#more info: http://data.worldbank.org/data-catalog/GDP-ranking-table
rdd_gdp = sc.textFile("../data/countries_GDP_clean.csv").map(lambda x: x.split(";"))
# check rdds size
hyp_final_rdd_num = rdd_gdp.count() if rdd_countries.count() > rdd_gdp.count() else rdd_countries.count()
print("The final number of elements in the joined rdd should be: ", hyp_final_rdd_num)
p_rdd_gdp = rdd_gdp.map(lambda x: (x[3],x))
p_rdd_countries = rdd_countries.map(lambda x: (x[1],x))
print(p_rdd_countries.take(1))
print(p_rdd_gdp.take(1))
p_rdd_contry_data = p_rdd_countries.join(p_rdd_gdp)
final_join_rdd_size = p_rdd_contry_data.count()
hyp = hyp_final_rdd_num == final_join_rdd_size
print("The initial hypothesis is ", hyp)
if not hyp:
print("The final joined rdd size is ", final_join_rdd_size)
Sometimes we don’t need the key to be present in both RDDs to want it in our result.
For example, imagine that our list of countries is not complete, and we don't want to miss data if it a country is not present in both RDDs.
leftOuterJoin(other) and rightOuterJoin(other) both join pair RDDs together by key, where one of the pair RDDs can be missing the key.
With leftOuterJoin() the resulting pair RDD has entries for each key in the source RDD.
The value associated with each key in the result is a tuple of the value from the source RDD and an Option for the value from the other pair RDD.
In Python, if a value isn’t present None is used; and if the value is present the regular value, without any wrapper, is used.
As with join(), we can have multiple entries for each key; when this occurs, we get the Cartesian product between the two lists of values.
rightOuterJoin() is almost identical to leftOuterJoin() except the key must be present in the other RDD and the tuple has an option for the source rather than the other RDD.
Exercise:
Use two simple RDDs to show the results of left and right outer join.
In [ ]:
n = 5
rdd_1 = sc.parallelize([(x,1) for x in range(n)])
rdd_2 = sc.parallelize([(x*2,1) for x in range(n)])
print("rdd_1: ",rdd_1.collect())
print("rdd_2: ",rdd_2.collect())
print("leftOuterJoin: ",rdd_1.leftOuterJoin(rdd_2).collect())
print("rightOuterJoin: ",rdd_1.rightOuterJoin(rdd_2).collect())
print("join: ", rdd_1.join(rdd_2).collect())
#explore what hapens if a key is present twice or more
rdd_3 = sc.parallelize([(x*2,1) for x in range(n)] + [(4,2),(6,4)])
print("rdd_3: ",rdd_3.collect())
print("join: ", rdd_2.join(rdd_3).collect())
Exercise:
Generate two pair RDDs with country info:
Then join them to have a pair RDD with country code plus GDP and life expentancy.
Answer:
Inspect the dataset with GDP.
In [ ]:
rdd_gdp = sc.textFile("../data/countries_GDP_clean.csv").map(lambda x: x.split(";"))
rdd_gdp.take(2)
#generate a pair rdd with countrycode and GDP
rdd_cc_gdp = rdd_gdp.map(lambda x: (x[1],x[4]))
rdd_cc_gdp.take(2)
Inspect the dataset with life expectancy.
In [ ]:
rdd_countries = sc.textFile("../data/countries_data_clean.csv").map(lambda x: x.split(","))
print(rdd_countries.take(2))
#generate a pair rdd with countrycode and lifexpectancy
#(more info in https://www.worlddata.info/downloads/)
#we don't have countrycode in this dataset, but let's try to add it
#we have a dataset with countrynames and countrycodes
#let's take countryname and ISO 3166-1 alpha3 code
rdd_cc = sc.textFile("../data/countrycodes.csv").\
map(lambda x: x.split(";")).\
map(lambda x: (x[0].strip("\""),x[4].strip("\""))).\
filter(lambda x: x[0] != 'Country (en)')
print(rdd_cc.take(2))
In [ ]:
rdd_cc_info = rdd_countries.map(lambda x: (x[1],x[16]))
rdd_cc_info.take(2)
In [ ]:
#let's count and see if something is missing
print(rdd_cc.count())
print(rdd_cc_info.count())
In [ ]:
#take only values, the name is no longer needed
rdd_name_cc_le = rdd_cc_info.leftOuterJoin(rdd_cc)
rdd_cc_le = rdd_name_cc_le.map(lambda x: x[1])
print(rdd_cc_le.take(5))
print(rdd_cc_le.count())
In [ ]:
#what is missing?
rdd_name_cc_le.filter(lambda x: x[1][1] == None).collect()
#how can we solve this problem??
We have some missing data, that we have to complete, but we have quite a lot of data, let's follow.
Inspect the results of GDP and life expectancy and join them. Is there some data missing?
In [ ]:
print("Is there some data missing?", rdd_cc_gdp.count() != rdd_cc_le.count())
print("GDP dataset: ", rdd_cc_gdp.count())
print("Life expectancy dataset: ", rdd_cc_le.count())
In [ ]:
#lets try to see what happens
print(rdd_cc_le.take(10))
print (rdd_cc_gdp.take(10))
In [ ]:
rdd_cc_gdp_le = rdd_cc_le.map(lambda x: (x[1],x[0])).leftOuterJoin(rdd_cc_gdp)
#we have some countries that the data is missing
# we have to check if this data is available
# or there is any error
rdd_cc_gdp_le.take(10)
Exercise: Sort country data by key.
In [ ]:
p_rdd_contry_data.sortByKey().take(2)
Exercises:
1. Count countries RDD by key
2. Collect countries RDD as map
3. Lookup Andorra info in countries RDD
In [ ]:
p_rdd_contry_data.countByKey()["Andorra"]
In [ ]:
p_rdd_contry_data.collectAsMap()["Andorra"]
In [ ]:
#p_rdd_contry_data.lookup("Andorra")
(from: Learning Spark - O'Reilly)
Spark programs can choose to control their RDDs’ partitioning to reduce communication.
Partitioning will not be helpful in all applications— for example, if a given RDD is scanned only once, there is no point in partitioning it in advance.
It is useful only when a dataset is reused multiple times in key-oriented operations such as joins.
Spark’s partitioning is available on all RDDs of key/value pairs, and causes the system to group elements based on a function of each key.
Spark does not give explicit control of which worker node each key goes to (partly because the system is designed to work even if specific nodes fail), it lets the program ensure that a set of keys will appear together on some node.
Example:
As a simple example, consider an application that keeps a large table of user information in memory—say, an RDD of (UserID, UserInfo) pairs, where UserInfo contains a list of topics the user is subscribed to.
In [ ]:
rdd_userinfo = sc.textFile("../data/users_events_example/user_info_1000users_20topics.csv")\
.filter(lambda x: len(x)>0)\
.map(lambda x: (x.split(",")[0],x.split(",")[1].split("|")))
rdd_userinfo.take(2)
The application periodically combines this table with a smaller file representing events that happened in the past five minutes—say, a table of (UserID, LinkInfo) pairs for users who have clicked a link on a website in those five minutes.
In [ ]:
rdd_userevents = sc.textFile("../data/users_events_example/userevents_*.log")\
.filter(lambda x: len(x))\
.map(lambda x: (x.split(",")[1], [x.split(",")[2]]))
print(rdd_userevents.take(2))
For example, we may wish to count how many users visited a link that was not to one of their subscribed topics. We can perform this combination with Spark’s join() operation, which can be used to group the User Info and LinkInfo pairs for each UserID by key.
In [ ]:
rdd_joined = rdd_userinfo.join(rdd_userevents)
print(rdd_joined.count())
print(rdd_joined.filter(lambda x: (x[1][1][0] not in x[1][0])).count())
print(rdd_joined.filter(lambda x: (x[1][1][0] in x[1][0])).count())
Imagine that we want to count the number of visits to non-subscribed visits using a function.
In [ ]:
rdd_userinfo = sc.textFile("../data/users_events_example/user_info_1000users_20topics.csv")\
.filter(lambda x: len(x)>0)\
.map(lambda x: (x.split(",")[0],x.split(",")[1].split("|"))).persist()
def process_new_logs(event_fite_path):
rdd_userevents = sc.textFile(event_fite_path)\
.filter(lambda x: len(x))\
.map(lambda x: (x.split(",")[1], [x.split(",")[2]]))
rdd_joined = rdd_userinfo.join(rdd_userevents)
print("Number of visits to non-subscribed topics: ",
rdd_joined.filter(lambda x: (x[1][1][0] not in x[1][0])).count())
process_new_logs("../data/users_events_example/userevents_01012016000500.log")
This code will run fine as is, but it will be inefficient.
This is because the join() operation, called each time process_new_logs() is invoked, does not know anything about how the keys are partitioned in the datasets.
By default, this operation will hash all the keys of both datasets, sending elements with the same key hash across the network to the same machine, and then join together the elements with the same key on that machine (see figure below).
Because we expect the rdd_userinfo table to be much larger than the small log of events seen every five minutes, this wastes a lot of work: the rdd_userinfo table is hashed and shuffled across the network on every call, even though it doesn’t change.
Fixing this is simple: just use the partitionBy() transformation on rdd_userinfo to hash-partition it at the start of the program. We do this by passing a spark.HashPartitioner object to partitionBy.
In [ ]:
rdd_userinfo = sc.textFile("../data/users_events_example/user_info_1000users_20topics.csv")\
.filter(lambda x: len(x)>0)\
.map(lambda x: (x.split(",")[0],x.split(",")[1].split("|"))).partitionBy(10)
rdd_userinfo
The process_new_logs() method can remain unchanged: the rdd_userevents RDD is local to process_new_logs(), and is used only once within this method, so there is no advantage in specifying a partitioner for events.
Because we called partitionBy() when building userData, Spark will now know that it is hash-partitioned, and calls to join() on it will take advantage of this information.
In particular, when we call user rdd_userinfo.join(rdd_userevents), Spark will shuffle only the events RDD, sending events with each particular UserID to the machine that contains the corresponding hash partition of rdd_userinfo.
The result is that a lot less data is communicated over the network, and the program runs significantly faster.
More on partitioning:
Note that partitionBy() is a transformation, so it always returns a new RDD—it does not change the original RDD in place. RDDs can never be modified once created. Therefore it is important to persist and save as rdd_userinfo the result of partitionBy(), not the original textFile().
Also, the 100 passed to partitionBy() represents the number of partitions, which will control how many parallel tasks perform further operations on the RDD (e.g., joins); in general, make this at least as large as the number of cores in your cluster.
In fact, many other Spark operations automatically result in an RDD with known partitioning information, and many operations other than join() will take advantage of this information.
For example, sortByKey() and groupByKey() will result in range-partitioned and hash-partitioned RDDs, respectively.
On the other hand, operations like map() cause the new RDD to forget the parent’s partitioning information, because such operations could theoretically modify the key of each record.